Go 并发编程-3.Mutex拓展

Try Lock

在实际开发中,如果要更新配置数据,我们通常需要加锁,这样可以避免同时有多个 goroutine 并发修改数据。有的时候,我们也会使用 TryLock。这样一来,当某个 goroutine 想要更改配置数据时,如果发现已经有 goroutine 在更改了,其他的 goroutine 调用 TryLock,返回了 false,这个 goroutine 就会放弃更改。

官方实现

在 Go 1.18 官方标准库中,已经为 Mutex/RWMutex 增加了 TryLock 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package main

import (
"fmt"
"sync"
)

func main() {
var mu sync.Mutex
if mu.TryLock() {
fmt.Println("TryLock Success")
defer mu.Unlock()
}
}

TryLock:

当一个 goroutine 调用这个 TryLock 方法请求锁的时候,如果这把锁没有被其他 goroutine 所持有,那么,这个 goroutine 就持有了这把锁,并返回 true;如果这把锁已经被其他 goroutine 所持有,或者是正在准备交给某个被唤醒的 goroutine,那么,这个请求锁的 goroutine 就直接返回 false,不会阻塞在方法调用上。

自定义拓展

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

// 复制Mutex定义的常量
const (
mutexLocked = 1 << iota // 加锁标识位置,值为1
mutexWoken // 唤醒标识位置,值为2
mutexStarving // 锁饥饿标识位置,值为4
mutexWaiterShift = iota // 标识waiter的起始bit位置
)

// 扩展一个Mutex结构
type Mutex struct {
sync.Mutex
}

// 尝试获取锁
func (m *Mutex) TryLock() bool {
// 如果能成功抢到锁
if atomic.CompareAndSwapInt32((*int32)(unsafe.Pointer(&m.Mutex)), 0, mutexLocked) {
return true
}

// 如果处于唤醒、加锁或者饥饿状态,这次请求就不参与竞争了,返回false
fmt.Println(unsafe.Pointer(&m.Mutex)) // 0xc0000200c0,这里获取到的是地址
fmt.Println(*(*int32)(unsafe.Pointer(&m.Mutex))) // 1,这里获取到的是值
old := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex)))
if old&(mutexLocked|mutexStarving|mutexWoken) != 0 {
return false
}

// 尝试在竞争的状态下请求锁
new := old | mutexLocked
return atomic.CompareAndSwapInt32((*int32)(unsafe.Pointer(&m.Mutex)), old, new)
}

atomic.CompareAndSwapInt32 方法是 Go 语言中的原子操作方法,作用是比较并交换指定内存地址中的值。其函数签名为:

func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)

其中,addr 是待操作的内存地址,oldnew 分别表示期望值和新值,如果地址 addr 中的值等于期望值 old,则将其替换为新值 new 并返回 true,否则不进行任何操作并返回 false。

该方法被广泛用于多线程编程中的同步操作,常用于实现一些基本的同步机制,例如自旋锁、互斥锁等。由于该方法是原子操作,因此可以避免竞态条件等多线程编程中的一些问题。

unsafe.Pointer:

unsafe.Pointer() 是 Go 语言中的一个指针类型转换函数,可以将任意类型的指针转换为 unsafe.Pointer 类型的指针, unsafe.Pointer 可以忽略类型安全检查。

因此,unsafe.Pointer(&m.Mutex) 的含义是将 &m.Mutex 转换为 unsafe.Pointer 类型的指针。

这里使用了 unsafe.Pointer 进行指针类型之间的转换,因为 Mutex 结构体中的字段并没有导出,无法直接对其进行访问。但是我们可以通过将 &m.Mutex 转换为 unsafe.Pointer 类型的指针,再通过 (*int32) 强制类型转换,来获取 Mutex 结构体中一个叫做 state 的私有字段的值。

使用示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

func try() {
var mu Mutex
go func() { // 启动一个goroutine持有一段时间的锁
mu.Lock()
time.Sleep(time.Duration(rand.Intn(2)) * time.Second)
mu.Unlock()
}()

time.Sleep(time.Second)

ok := mu.TryLock() // 尝试获取到锁
if ok { // 获取成功
fmt.Println("got the lock")
// do something
mu.Unlock()
return
}

// 没有获取到
fmt.Println("can't get the lock")
}

获取锁的状态

先来回顾一下 Mutex 的数据结构,如下面的代码所示。它包含两个字段,state 和 sema。前四个字节(int32)就是 state 字段。

1
2
3
4
type Mutex struct {
state int32
sema uint32
}

Mutex 结构中的 state 字段有很多个含义,通过 state 字段,你可以知道锁是否已经被某个 goroutine 持有、当前是否处于饥饿状态、是否有等待的 goroutine 被唤醒、等待者的数量等信息,但是,state 这个字段并没有暴露出来,但我们可以通过 unsafe 的方式实现。

nsafe:

在 Go 语言中,使用 unsafe 包可以绕过类型系统的限制,直接操作内存。通过 unsafe.Pointer 类型,可以将任何类型转换为指针类型,并可以进行指针运算和指针类型转换。但是,使用 unsafe 包需要非常小心,因为不当使用可能会导致程序崩溃或安全漏洞。

如果您想要获取一个未暴露的结构体字段,可以使用 unsafe 包中的 Offsetof()Ptr() 函数。假设有以下结构体:

type MyStruct struct {
ExportedField int
unexportedField string
}

其中 unexportedField 是私有字段,不能被外部包访问。但是,您可以使用以下代码获取该字段的地址:

// 实例话结构体
myStruct := MyStruct{ExportedField: 42, unexportedField: “Hello”}

// 获取 myStruct 的地址
ptrToMyStruct := &myStruct

// unsafe.Offsetof() 函数获取 unexportedField 字段在 MyStruct 结构体中的偏移量,然后将其加上结构体的起始地址,得到了该字段的地址。最后,我们将地址转换为 unsafe.Pointer 类型,然后转换为 string 类型。
ptrToUnexportedField := (*string)(unsafe.Pointer(uintptr(unsafe.Pointer(ptrToMyStruct)) + unsafe.Offsetof(myStruct.unexportedField)))

// 修改未来暴露出来的字段的值
*ptrToUnexportedField = “World”

fmt.Printf(“%+v\n”, myStruct)
// {ExportedField:42 unexportedField:World}

fmt.Println(*ptrToUnexportedField)
// World

在这里,我们使用了 unsafe.Offsetof() 函数获取 unexportedField 字段在 MyStruct 结构体中的偏移量,然后将其加上结构体的起始地址,得到了该字段的地址。最后,我们将地址转换为 unsafe.Pointer 类型,以便进行进一步的操作。

获取锁的状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexStarving
mutexWaiterShift = iota
)

type Mutex struct {
sync.Mutex
}

func (m *Mutex) Count() int {
// 获取state字段的值
v := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex)))
v = v >> mutexWaiterShift + (v & mutexLocked)
return int(v)
}

// 锁是否被持有
func (m *Mutex) IsLocked() bool {
state := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex)))
return state&mutexLocked == mutexLocked
}

// 是否有等待者被唤醒
func (m *Mutex) IsWoken() bool {
state := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex)))
return state&mutexWoken == mutexWoken
}

// 锁是否处于饥饿状态
func (m *Mutex) IsStarving() bool {
state := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex)))
return state&mutexStarving == mutexStarving
}
  • 如何理解 state&mutexLocked == mutexLocked?

这行代码执行了按位与操作 (&) 在 statemutexLocked 两个数之间。如果互斥锁被锁定,那么 state 的二进制表示的最低位就是1,而 mutexLocked 的二进制表示只有最低位是1,其它各位都是0。因此,按位与操作的结果就是一个只有最低位是1,其它各位都是0的值,即 mutexLocked

如果互斥锁没有被锁定,那么 state 的最低位就是0,按位与操作的结果也是0。所以,当 state&mutexLocked 的结果等于 mutexLocked 时,就说明了互斥锁已经被锁定

  • 如何理解 state&mutexWoken == mutexWoken?

如果互斥锁的等待状态被唤醒,那么 state 的二进制表示的第二个最低位就是1,而 mutexWoken 的二进制表示只有第二个最低位是1,其它各位都是0。因此,按位与操作的结果就是一个只有第二个最低位是1,其它各位都是0的值,即 mutexWoken

如果互斥锁的等待状态没有被唤醒,那么 state 的第二个最低位就是0,按位与操作的结果也是0。所以,当 state&mutexWoken 的结果等于 mutexWoken 时,就说明需要唤醒等待者。这个唤醒操作会在 Unlock 方法中执行,而且只会在必要的情况下进行,避免不必要的系统调用。

  • 如何理解 state&mutexStarving == mutexStarving?

如果互斥锁处于饥饿模式,那么 state 的二进制表示的第三个最低位就是1,而 mutexStarving 的二进制表示只有第三个最低位是1,其它各位都是0。因此,按位与操作的结果就是一个只有第三个最低位是1,其它各位都是0的值,即 mutexStarving

如果互斥锁没有处于饥饿模式,那么 state 的第三个最低位就是0,按位与操作的结果也是0。所以,当 state&mutexStarving 的结果等于 mutexStarving 时,就说明该互斥锁处于饥饿模式。

饥饿模式是指高优先级的 goroutine 在等待锁的过程中一直无法获得锁,而低优先级的 goroutine 却能够获得锁。为了避免这种情况,一旦一个 goroutine 等待互斥锁的时间超过了一定的阈值,该互斥锁就会进入饥饿模式,让等待时间更长的 goroutine 先获得锁。

使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func main() {
var mu Mutex
for i := 0; i < 1000; i++ { // 启动1000个goroutine
go func() {
mu.Lock()
time.Sleep(time.Duration(rand.Intn(2)) * time.Second)
mu.Unlock()
}()
}

time.Sleep(time.Second * 3)
// 输出锁的信息
fmt.Printf("等待者的数量: %d, 锁是否被持有: %t, 是否有等待者被唤醒: %t, 锁是否处于饥饿状态: %t\n", mu.Count(), mu.IsLocked(), mu.IsWoken(), mu.IsStarving())
// 等待者的数量: 998, 锁是否被持有: true, 是否有等待者被唤醒: false, 锁是否处于饥饿状态: false
}

线程安全的队列

Slice 实现的队列不是线程安全的,出队(Dequeue)和入队(Enqueue)会有 data race 的问题。这时可以使用 Mutex 来实现线程安全的队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

type SliceQueue struct {
data []interface{}
mu sync.Mutex
}

func NewSliceQueue(n int) (q *SliceQueue) {
return &SliceQueue{data: make([]interface{}, 0, n)}
}

// Enqueue 把值放在队尾
func (q *SliceQueue) Enqueue(v interface{}) {
q.mu.Lock()
q.data = append(q.data, v)
q.mu.Unlock()
}

// Dequeue 移去队头并返回
func (q *SliceQueue) Dequeue() interface{} {
q.mu.Lock()
if len(q.data) == 0 {
q.mu.Unlock()
return nil
}
v := q.data[0]
q.data = q.data[1:]
q.mu.Unlock()
return v
}

Mutex 知识总结